compio_driver\sys\driver\iocp/
mod.rs1use std::{
2 collections::HashMap, marker::PhantomData, os::windows::io::AsRawHandle, sync::Arc,
3 time::Duration,
4};
5
6use flume::{Receiver, Sender};
7use windows_sys::Win32::{Foundation::ERROR_OPERATION_ABORTED, System::IO::OVERLAPPED};
8
9use crate::{
10 AsyncifyPool, DriverType, Entry, ErasedKey, ProactorBuilder,
11 control::Carrier,
12 sys::{driver::AwakeFlag, extra::IocpExtra, prelude::*},
13};
14
15mod cp;
16mod wait;
17
18mod_use![op];
19
20pub enum OpType {
22 Overlapped,
24 Blocking,
27 Event(RawFd),
31}
32
33pub(crate) struct Driver {
35 notify: Arc<Notify>,
36 waits: HashMap<usize, wait::Wait>,
37 pool: AsyncifyPool,
38 completed_tx: Sender<Entry>,
39 completed_rx: Receiver<Entry>,
40 _local_marker: PhantomData<ErasedKey>,
41}
42
43impl Driver {
44 pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
45 instrument!(compio_log::Level::TRACE, "new", ?builder);
46
47 let port = cp::Port::new()?;
48 let driver = port.as_raw_handle() as _;
49 let overlapped = Overlapped::new(driver);
50 let notify = Arc::new(Notify::new(port, overlapped));
51 let (completed_tx, completed_rx) = flume::unbounded();
52
53 Ok(Self {
54 notify,
55 completed_tx,
56 completed_rx,
57 waits: HashMap::default(),
58 pool: builder.create_or_get_thread_pool(),
59 _local_marker: PhantomData,
60 })
61 }
62
63 pub fn driver_type(&self) -> DriverType {
64 DriverType::IOCP
65 }
66
67 fn port(&self) -> &cp::Port {
68 &self.notify.port
69 }
70
71 pub(in crate::sys) fn default_extra(&self) -> IocpExtra {
72 IocpExtra::new(self.port().as_raw_handle() as _)
73 }
74
75 pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
76 self.port().attach(fd)
77 }
78
79 pub fn cancel(&mut self, key: ErasedKey) {
80 instrument!(compio_log::Level::TRACE, "cancel", ?key);
81 trace!("cancel RawOp");
82 let optr = key.borrow().extra_mut().optr();
83 if let Some(w) = self.waits.get_mut(&key.as_raw())
84 && w.cancel().is_ok()
85 {
86 self.port().post_raw(optr).ok();
89 }
90 trace!("call OpCode::cancel");
91 key.borrow().carrier.cancel(optr.cast()).ok();
93 }
94
95 pub fn push(&mut self, key: ErasedKey) -> Poll<io::Result<usize>> {
96 instrument!(compio_log::Level::TRACE, "push", ?key);
97 trace!("push RawOp");
98 let mut op = key.borrow();
99 let optr = op.extra_mut().optr();
100 let op_type = op.carrier.op_type();
101 match op_type {
102 OpType::Overlapped => unsafe {
103 let res = op.carrier.operate(optr.cast());
104 drop(op);
105 if res.is_pending() {
106 key.into_raw();
107 }
108 res
109 },
110 OpType::Blocking => {
111 drop(op);
112 self.push_blocking(key);
113 Poll::Pending
114 }
115 OpType::Event(e) => {
116 drop(op);
117 self.waits
118 .insert(key.as_raw(), wait::Wait::new(self.notify.clone(), e, key)?);
119 Poll::Pending
120 }
121 }
122 }
123
124 fn push_blocking(&mut self, key: ErasedKey) {
125 let notify = self.notify.clone();
126 let tx = self.completed_tx.clone();
127
128 let mut key = unsafe { key.freeze() };
130
131 let mut closure = move || {
132 let res = key.as_mut().operate_blocking();
133 let entry = Entry::new(key.into_inner(), res);
134 _ = tx.send(entry);
135 notify.wake();
136 };
137
138 while let Err(e) = self.pool.dispatch(closure) {
139 closure = e.0;
140 std::thread::yield_now();
141 }
142 }
143
144 pub fn flush(&mut self) -> bool {
145 self.notify.reset()
146 }
147
148 fn create_entry(
149 notify: *const Overlapped,
150 waits: &mut HashMap<usize, wait::Wait>,
151 entry: cp::RawEntry,
152 ) -> Option<Entry> {
153 if entry.overlapped.cast_const() == notify {
154 return None;
155 }
156
157 let entry = Entry::new(
158 unsafe { ErasedKey::from_optr(entry.overlapped) },
159 entry.result,
160 );
161
162 let Some(w) = waits.remove(&entry.user_data()) else {
164 return Some(entry);
165 };
166
167 let entry = if w.is_cancelled() {
168 Entry::new(
169 entry.into_key(),
170 Err(io::Error::from_raw_os_error(ERROR_OPERATION_ABORTED as _)),
171 )
172 } else if entry.result.is_err() {
173 entry
174 } else {
175 let key = entry.into_key();
176 let result = key.borrow().operate_blocking();
177 Entry::new(key, result)
178 };
179
180 Some(entry)
181 }
182
183 pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
184 instrument!(compio_log::Level::TRACE, "poll", ?timeout);
185
186 let notify = &self.notify.overlapped as *const Overlapped;
187
188 let mut has_entry = false;
189 while let Ok(entry) = self.completed_rx.try_recv() {
190 entry.notify();
191 has_entry = true;
192 }
193 if self.notify.reset() {
194 has_entry = true;
195 }
196
197 if !has_entry {
198 for e in self.notify.port.poll(timeout)? {
199 if let Some(e) = Self::create_entry(notify, &mut self.waits, e) {
200 self.notify.set_awake();
201 e.notify()
202 }
203 }
204 }
205 self.notify.set_awake();
206
207 Ok(())
208 }
209
210 pub fn waker(&self) -> Waker {
211 Waker::from(self.notify.clone())
212 }
213
214 pub fn pop_multishot(&mut self, _: &ErasedKey) -> Option<BufResult<usize, crate::sys::Extra>> {
215 None
216 }
217}
218
219impl AsRawFd for Driver {
220 fn as_raw_fd(&self) -> RawFd {
221 self.port().as_raw_handle() as _
222 }
223}
224
225pub(crate) struct Notify {
227 port: cp::Port,
228 overlapped: Overlapped,
229 awake: AwakeFlag,
230}
231
232impl Notify {
233 fn new(port: cp::Port, overlapped: Overlapped) -> Self {
234 Self {
235 port,
236 overlapped,
237 awake: AwakeFlag::new(),
238 }
239 }
240
241 fn set_awake(&self) {
242 self.awake.set();
243 }
244
245 fn reset(&self) -> bool {
246 self.awake.reset()
247 }
248}
249
250impl Wake for Notify {
251 fn wake(self: Arc<Self>) {
252 self.wake_by_ref();
253 }
254
255 fn wake_by_ref(self: &Arc<Self>) {
256 if !self.awake.wake() {
257 self.port.post_raw(&self.overlapped).ok();
258 }
259 }
260}